定时队列任务:定义不同的通知处理逻辑(邮件与短信)
本节在队列分类体系的基础上,实现 scheduled_tasks 队列的具体消费者逻辑。我们将创建 ScheduledTasksConsumer,在其中定义 sendMail 和 sendSMS 两种不同的任务处理器,并使用 delay 属性实现秒级精度的延迟通知。同时封装 diffNow 时间差计算工具函数,支持指定任意目标时间的延迟任务。
创建 ScheduledTasksConsumer
在 q/services/ 目录下创建消费者,使用 @Process 装饰器为不同任务类型定义独立的处理方法:
// scheduled-tasks.consumer.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
import { MailerService } from '@nestjs-modules/mailer';
@Processor('scheduled_tasks')
export class ScheduledTasksConsumer {
constructor(private mailService: MailerService) {}
@Process('sendMail')
async handleSendMail(job: Job<ISendMailOptions>) {
const { to, subject, text, html } = job.data;
const res = await this.mailService.sendMail({
to,
subject,
text,
html,
});
console.log('Email sent:', res);
}
@Process('sendSMS')
async handleSendSMS(job: Job<{ phone: string; message: string }>) {
const { phone, message } = job.data;
console.log(`SMS sent to ${phone}: ${message}`);
// Integrate SMS service provider API
}
}
typescript
关键设计:
- 同一个 Consumer 可以定义多个
@Process方法,通过方法名(如'sendMail'、'sendSMS')区分不同的任务类型 job.data的类型参数指定了每种任务所期望的数据结构- 邮件任务直接使用
MailerService,短信任务可对接第三方短信服务商 API
邮件服务的条件加载
由于邮件模块(MailModule)是根据 .env 中 MAIL_ON 属性决定是否加载的,需要在 QModule 中动态判断:
// q.module.ts
import { Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
@Module({})
export class QModule {
static register(config: ConfigService): DynamicModule {
const conditionalImports = [];
if (config.get('MAIL_ON') === 'true') {
conditionalImports.push(MailModule);
}
return {
module: QModule,
imports: [
BullModule.forRootAsync({ /* ... */ }),
BullModule.registerQueue({ name: 'scheduled_tasks' }),
...conditionalImports,
],
providers: [ScheduledTasksConsumer],
exports: [BullModule],
};
}
}
typescript
添加带延迟的任务
在 Controller 中向 scheduled_tasks 队列添加延迟任务时,需要指定任务名称和 delay 参数:
// app.controller.ts
import { Controller, Get, InjectQueue } from '@nestjs/common';
import { Queue } from 'bull';
@Controller()
export class AppController {
constructor(
@InjectQueue('scheduled_tasks') private scheduledQueue: Queue,
) {}
@Get('send-mail')
async sendMail() {
await this.scheduledQueue.add('sendMail', {
to: 'user@example.com',
subject: 'Welcome',
html: '<h1>Hello!</h1>',
}, {
delay: 3000, // Delay 3 seconds (in milliseconds)
});
return { status: 'queued' };
}
@Get('send-sms')
async sendSMS() {
await this.scheduledQueue.add('sendSMS', {
phone: '+8613800138000',
message: 'Your verification code is 123456',
}, {
delay: 3000,
});
return { status: 'queued' };
}
}
typescript
queue.add() 三参数说明:
queue.add(
taskName, // String: matches @Process('taskName')
data, // Object: job payload passed to consumer
options, // Object: delay, attempts, backoff, etc.
)
typescript
delay 的单位与注意事项
Bull 的 delay 参数单位是毫秒(milliseconds),不是秒:
// 3 seconds delay
delay: 3000 // Correct: 3 * 1000 = 3000ms
// NOT seconds
delay: 3 // Wrong: this is only 3ms
typescript
封装时间差计算工具
当需要指定"在某年某月某日某时执行"时,需要计算目标时间与当前时间的毫秒差值。创建 diffNow 工具函数:
npm install dayjs
bash
// utils/diff-now.ts
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import { UnitType } from 'dayjs';
dayjs.extend(utc);
export function diffNow(
dateString: string,
offset: number = 0,
unit: UnitType = 'millisecond',
): number {
const now = dayjs.utc().add(offset, 'hour');
const targetDate = dayjs.utc(dateString);
const diff = targetDate.diff(now, unit);
return diff;
}
typescript
使用示例:
// Beijing time is UTC+8
const delayMs = diffNow('2026-06-01 10:00:00', 8, 'millisecond');
await this.scheduledQueue.add('sendMail', {
to: 'user@example.com',
subject: 'Happy New Year',
}, {
delay: delayMs, // Execute at the specified time
});
typescript
参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
dateString | string | 目标时间,格式如 2026-06-01 10:00:00 |
offset | number | UTC 偏移量(北京时间为 8) |
unit | UnitType | 返回值的单位,默认毫秒 |
条件模块的改造
将 QModule 从直接导入改为调用 register 方法,支持传入 ConfigService:
// conditional.module.ts
const configService = app.get(ConfigService);
if (configService.get('QUEUE_ON') === 'true') {
imports.push(QModule.register(configService));
}
typescript
测试验证
- 确保队列的
providers中已添加ScheduledTasksConsumer - 向
GET /send-mail发送请求,3 秒后控制台输出邮件发送日志 - 向
GET /send-sms发送请求,3 秒后控制台输出短信发送日志 - 验证
delay的单位为毫秒,3 秒 =delay: 3000
执行流程总结
Controller Queue (Redis) Consumer
| | |
| add('sendMail', data, | |
| { delay: 3000 }) | |
|----------------------------->| |
| | Wait 3000ms |
| |-------------------------->|
| | | @Process('sendMail')
| | | mailService.sendMail()
| | | console.log('Email sent')
text
本节总结
- 在
ScheduledTasksConsumer中通过@Process('sendMail')和@Process('sendSMS')定义不同的任务处理器 - 使用
queue.add(taskName, data, { delay })实现延迟执行,delay 单位为毫秒 - 邮件模块通过条件加载机制按需注入到 QModule 中
- 封装
diffNow工具函数计算目标时间与当前时间的差值,支持指定任意 UTC 偏移 - 同一个 Consumer 可以处理多种类型的任务,通过任务名称精确路由到对应的
@Process方法
↑